<?php
/**
 * Created by PhpStorm.
 * User: tangzhixing
 * Date: 2021/4/6
 * Time: 11:03
 */
namespace Asf\MessageQueue;

use Asf\Contracts\Queue\MessageQueue;
use Asf\Database\RedisPool;
use Swoole\Process;

/**
 * 基于Redis实现的消息队列，使用该类必须开启redis服务
 * Class RedisMessageQueue
 * @package Asf\MessageQueue
 */
class RedisMessageQueue
{
    protected $redis;
    public function __construct(RedisPool $redisPool)
    {
        $this->redis = $redisPool;
    }

    /**
     * 启动队列进程的主方法
     * @param array $messageList   消息队列消费者的数组集合，key为该消费者对应redis的key，value为该消费者的实例
     */
    public function run($messageList, $delayList)
    {
        foreach ($messageList as $k => $v){
            $this->consumer($v, $k);
        }
        foreach ($delayList as $k => $v){
            $this->delayConsumer($v, $k);
        }
        while (true) {
            sleep(3);
        }
        //\Swoole\Process::wait();
        //使用Process作为监控父进程，创建管理子进程时，父类必须注册信号SIGCHLD对退出的进程执行wait，否则子进程退出时会变成僵尸进程
        Process::signal(SIGCHLD, function ($signo) {
            //false 非阻塞模式
            while ($ret = Process::wait(false)) {
                var_dump($ret);
            }
        });
    }

    /**
     *  启动处理队列逻辑的子进程
     * @param MessageQueue $messageQueue 消费者的实例
     * @param string $key  消费者对应redis的key
     */
    public function consumer(MessageQueue $messageQueue, $key)
    {
        //2、此子进程用于消费队列
        $process2 = new Process(function (\Swoole\Process $process) use($messageQueue, $key) {

            while (true) {
                $redis = $this->redis->connect();

                //延迟消息队列  延迟消息队列要重写 在配置文件新配置一个延迟对列数组 使用Zset的形式看能不能实现
//                $delay = $redis->get(my_env('QUEUE_PREFIX') . '_' .$key);
//                if (!empty($delay)){
//                    sleep(2);
//                    continue;
//                }

                //从队列中提取数据
                $data = $redis->rPop($key);
                if ($data) {
                    go(function () use($data,$messageQueue,$redis,$key){
                        $result = $messageQueue->handle($data);
                        //如果处理是失败，返回队列排队
                        if (!$result){
                            $redis->lPush('error_' . $key, $data);
                        }
                    });
                }else{
                    sleep(3);
                }
                $this->redis->close($redis);

            }
        }, false, SOCK_STREAM, true);
        $process2->useQueue(2);
        echo '子进程 id:' . $process2->start() . PHP_EOL;
    }


    public function delayConsumer(MessageQueue $messageQueue, $key)
    {
        //2、此子进程用于延迟消费队列
        $process2 = new Process(function (\Swoole\Process $process) use($messageQueue, $key) {

            while (true) {
                $redis = $this->redis->connect();

                //从队列中提取数据
                $data = $redis->zrange($key, 0, -1, true);
                $this->redis->close($redis);

                foreach ($data as $k => $v){
                    if ($v <= time()){
                        go(function () use($data, $messageQueue, $key, $k){
                            $result = $messageQueue->handle($k);
                            //协程内不能与外部共用一个redis连接,所以要新建一个
                            $redis = $this->redis->connect();
                            //如果处理是失败，返回队列排队
                            if (!$result){
                                $redis->lPush('error_' . $key, $k);
                            }else{
                                $redis->zrem($key,$k);
                            }
                            $this->redis->close($redis);
                        });
                    }else{
                        sleep(2);
                        break;
                    }
                }

            }
        }, false, SOCK_STREAM, true);
        $process2->useQueue(2);
        echo '子进程 id:' . $process2->start() . PHP_EOL;
    }

}